#!/usr/bin/env lua
--
-- Simple chat server that broadcasts user messages to everybody connected,
-- and accepts simple commands.
--
-- Provides simple example of combining sockets, condition variables, and a
-- basic flow control methodology.
--
-- Also provides an example of signal management and a dead-man switch using
-- pre-emptive threads.
--
local cqueues = require"cqueues"
local errno = require"cqueues.errno"
local socket = require"cqueues.socket"
local signal = require"cqueues.signal"
local thread = require"cqueues.thread"
local condition = require"cqueues.condition"

local EAGAIN = errno.EAGAIN
local EPIPE = errno.EPIPE
local poll = cqueues.poll
local sleep = cqueues.sleep
local monotime = cqueues.monotime


local port = socket.listen("0.0.0.0", ... or 6667) -- IRC port :)
local loop = cqueues.new()

local MaxIdle = 300
local Streams = { } -- socket -> user mapping
local Users = { } -- nickname -> user mapping


--
-- simple fifo module
--
-- ------------------------------------------------------------------------
local fifo = {}

function fifo.new()
	local self = { condvar = condition.new(), count = 0 }

	return setmetatable(self, { __index = fifo }) 
end -- fifo.new

function fifo:put(msg)
	local tail = { data = msg }

	if self.tail then
		self.tail.next = tail
		self.tail = tail
	else
		self.head = tail
		self.tail = tail
	end

	self.count = self.count + 1

	self:signal()
end -- fifo:put

function fifo:get()
	if self.head then
		local head = self.head

		self.head = head.next

		if not self.head then
			self.tail = nil
		end

		assert(self.count > 0)
		self.count = self.count - 1

		return head.data
	end	

	assert(self.count == 0)
end -- fifo:get

function fifo:signal()
	self.condvar:signal()
end -- fifo:signal

function fifo:getcv()
	return self.condvar
end -- fifo:getcv


--
-- simple chat session object
--
-- ------------------------------------------------------------------------
local chat = {}

function chat.new(con)
	local self = { }
	local _, ip, port = con:peername()

	self.socket = con
	self.from = string.format("[%s]:%d", ip, port)
	self.msgs = fifo.new()
	self.eof = false --> flag to tell loop to exit
	self.lastt = monotime() --> last transmission time

	return setmetatable(self, { __index = chat })
end -- chat.new

function chat:greet()
	self.socket:write"# What's your nickname?\n"

	self.nick = assert(self.socket:read"*l")

	self.socket:write(string.format("# Hello %s\n", self.nick))
	self.socket:write"# commands: /die /quit /who\n"

	return self.nick
end -- chat:greet

function chat:pollfd()
	return self.socket:pollfd()
end -- chat:pollfd

function chat:events()
	-- NOTE: this would need to be changed to something more
	-- sophisticated if we changed our flow control protocol in
	-- chat:loop.
	return self.socket:events()
end -- chat:events

function chat:tryrecv()
	repeat
		local msg, why = self.socket:recv"*L"

		if msg then
			local cmd = string.match(msg, "^/(%w+)")

			if cmd then
				cmd = string.lower(cmd)

				if cmd == "quit" then
					self.socket:write"# goodbye\n"
					self:exit()

					return true
				elseif cmd == "who" then
					for nick, user in pairs(Users) do
						self.socket:write(string.format("# %s from %s\n", nick, user.from))
					end
				elseif cmd == "die" then
					signal.raise(signal.SIGTERM)
				end
			else
				for _, user in pairs(Users) do
					user:put(msg)
				end
			end

			self.lastt = monotime()
		elseif why ~= EAGAIN and why ~= EPIPE then
			return false, why
		end
	until not msg

	return true
end -- chat:tryrecv

function chat:trysend(flush)
	repeat
		local msg = self.msgs:get()

		if msg then
			self.socket:send(msg, 1, #msg)
		end
	until not msg

	--
	-- Use a yielding flush (blocking our immediate coroutine) to the
	-- socket to simplify things.
	--
	-- If not, we'll have to synthesis an event mask by combining the
	-- event states after both :recv and :send, which would normally be
	-- either "r" if send flushed, or "rw" if send didn't flush. But
	-- note that if using SSL, read attempts can trigger writes, and
	-- vice-versa, if the transport channel does a key exchange in the
	-- middle of the stream. We would then have to be careful about
	-- always adding the "r" event, because a malicious client could
	-- send data while OpenSSL's state machine is only trying to write
	-- data (never reading from the socket), which means we'd constantly
	-- be waking up. This is an issue lots of software is probably
	-- susceptible to, actually. Could make for a cool presentation at a
	-- hacker conference ;)
	--
	-- Ultimately, if the client isn't reading our data fast enough, why
	-- should we read his? It would complicate our flow control logic,
	-- and make SSL event management a headache.
	--
	if flush then
		local ok, why = self.socket:flush()

		if not ok and why ~= EAGAIN and why ~= EPIPE then
			return false, why
		end
	end

	return true
end -- chat:trysend()

function chat:loop()
	local nick = self:greet()

	Streams[self.socket] = self

	if Users[self.nick] then
		Users[self.nick]:put(string.format("# nick %s taken by %s\n", nick, self.from))
		Users[self.nick]:exit()
	end

	Users[self.nick] = self

	repeat
		--
		-- Do :tryrecv last so we poll on the proper events.
		-- :trysend won't return until it's flushed, so we don't
		-- care about send events.
		--
		-- Loop because :tryrecv may have enqueued a message to
		-- ourselves.
		--
		repeat
			assert(self:trysend(true))
			assert(self:tryrecv())
		until self.msgs.count == 0 or self.eof

		local eof, gone = self.socket:eof()

		if eof or gone then
			self:exit()
		end

		if monotime() - self.lastt > MaxIdle then
			self:exit()
		end

		if not self.eof then
			poll(self, self.msgs.condvar, 10)
		end
	until self.eof

	local _, gone = self.socket:eof()

	if not gone then
		self:trysend(false) --> send any goodbye messages
	end
end -- chat:loop

function chat:exit(force)
	self.eof = true

	self.msgs:signal()

	if force then
		self.socket:shutdown"rw"
	end
end -- chat:exit

function chat:close()
	Streams[self.socket] = nil

	if Users[self.nick] == self then
		Users[self.nick] = nil
	end

	self.socket:close() --> descriptors are a precious resource
end -- chat:close

function chat:put(msg)
	self.msgs:put(msg)
end -- chat:put


--
-- Our core block which spawns a new coroutine for each client.
--
-- ------------------------------------------------------------------------
loop:wrap(function()
	for con in port:clients() do
		loop:wrap(function()
			local chat = chat.new(con)

			-- don't let buggy code take down whole server
			local ok, why = pcall(chat.loop, chat)

			if not ok then
				io.stderr:write(string.format("%s: %s\n", chat.from, why))
			end

			chat:close()
		end)
	end
end)


--
-- Try to handle our signals cleanly by doing managed shutdown.
--
-- Do signal management in the same POSIX thread as our client loop because
-- the /die command raises SIGTERM, and raise(2) sends the signal to the
-- issuing thread, regardless of signal masks. Linux signalfd and Solaris
-- sigtimedwait won't detect signals sent to another thread.
--
-- ------------------------------------------------------------------------
loop:wrap(function()
	signal.block(signal.SIGTERM, signal.SIGHUP, signal.SIGINT)

	local sigs = signal.listen(signal.SIGTERM, signal.SIGHUP, signal.SIGINT)
	sigs:wait()

	for _, user in pairs(Users) do
		user:put"# server exiting\n"
		user:exit()
	end

	local deadline = monotime() + 1

	while next(Users) and deadline > monotime() do
		sleep(0.05)
	end

	os.exit(true)
end)


--
-- Implement a dead-man switch in case a bug in our code causes the main
-- loop to stall.
--
-- ------------------------------------------------------------------------
loop:wrap(function()
	local thr, pipe = thread.start(function(pipe)
		local cqueues = require"cqueues"
		local sleep = cqueues.sleep
		local poll = cqueues.poll
		local loop = cqueues.new()

		loop:wrap(function()
			sleep(10)

			-- try to drain socket so we don't get stale alive
			-- tokens on successive iterations.
			while pipe:recv(-32) do
				poll(pipe, 10)
			end

			io.stderr:write"main thread unresponsive\n"

			os.exit(false)
		end)

		local ok, why = loop:loop()

		io.stderr:write(string.format("dead-man thread failed: %s\n", why or "unknown error"))

		os.exit(false)
	end)

	loop:wrap(function()
		while true do
			sleep(5)
			assert(pipe:write"!\n")
		end
	end)	
end)


-- start our main loop
assert(loop:loop())
